import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class WordCountKeyedState
{
    public static void main(String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 初始化测试单词数据流
        DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction1());

        // 切割单词，并转换为元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap(
                (String line, Collector<Tuple2<String, Integer>> ctx) ->
                {
                    Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按照单词进行分组

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedWordTupleDS = wordTupleDS.keyBy(0);

        // 对单词进行计数
        keyedWordTupleDS.flatMap(new RichFlatMapFunction1()).print();
        env.execute("KeyedState State");
    }
}




/*

这个例子来自：
https://www.cnblogs.com/ilovezihan/p/12247368.html

 */